package com.reisdelay.redis;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Component
public class RedisDelayQueueImpl implements RedisDelayQueue, ApplicationContextAware {
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    private ZSetOperations<String, Object> zSet;
    private Collection<RedisConsumer> redisConsumerImpls;
    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

    @PostConstruct
    private void init() {
        this.zSet = redisTemplate.opsForZSet();
        scheduledExecutorService.scheduleWithFixedDelay(() -> {
            redisConsumerImpls.forEach(redisConsumerImpl -> {
                String key = redisConsumerImpl.getKey();
                Set<Object> data = this.pull(key);
                data.forEach(d -> {
                    redisConsumerImpl.Consumer(d, zSet);
                });
            });
        }, 10, 2000, TimeUnit.MILLISECONDS);
    }

    @Override
    public boolean push(String key, Object data, Long expireTime) throws Exception {
        return zSet.add(key, data, expireTime);
    }

    private Set<Object> pull(String key) {
        Set<Object> data = zSet.rangeByScore(key, 0, System.currentTimeMillis());
        return data;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, RedisConsumer> beansOfType = applicationContext.getBeansOfType(RedisConsumer.class);
        this.redisConsumerImpls = beansOfType.values();
    }
}
